package com.rtsapp.server.network.protocol.rpc.client.impl;

import com.rtsapp.server.network.protocol.ProtocolConstants;
import com.rtsapp.server.network.protocol.rpc.client.IPushHandler;
import com.rtsapp.server.network.protocol.rpc.client.IRPCClientProxy;
import com.rtsapp.server.network.protocol.rpc.client.RPCCallback;
import com.rtsapp.server.network.protocol.rpc.codec.RPCRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import com.rtsapp.server.logger.Logger;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class RPCClientProxy implements IRPCClientProxy{
	
	protected static final Logger logger =com.rtsapp.server.logger.LoggerFactory.getLogger(  RPCClientProxy.class );

	//默认超时时间30秒
	private static final long DEFAULT_SYNC_TIME_OUT = 120;


	protected final RPCClientImpl.ClientCfg cfg;
	protected final EventLoopGroup eventLoop;
	protected final boolean linux;


	private final List<RPCClientInHandler> handlers = new ArrayList<>( );
	private final AtomicInteger handlerIndex = new AtomicInteger( 0 );

	//initialhandler
	protected SocketAddress remotePeer;
	protected RPCClientInitializer clientInitialHandler;

	private long RECONN_INTERVAL_MILLIS = 1000;


	//客户端身份ID
	private String clientId = null ;
	//处理服务器推送的Handler
	private IPushHandler pushHandler = null;



	public RPCClientProxy( RPCClientImpl.ClientCfg cfg, EventLoopGroup eventLoop , boolean linux ){

		this.cfg = cfg;
		this.eventLoop = eventLoop;
		this.linux = linux;

	}

	public void start(){

		try {

			initClientInitHandler();

			for( int i = 0; i < cfg.getNum(); i++ ) {
				doConnect( remotePeer, 0);
			}

		} catch ( Throwable ex ) {
			logger.error( "RPCClient 启动失败", ex );
		}
	}

	protected void initClientInitHandler() {
		this.remotePeer = new InetSocketAddress( cfg.getIp() , cfg.getPort( ) );

		//创建clientInitialHander
		clientInitialHandler = new RPCClientInitializer( this,cfg.getMaxFrameLength() );
	}


	/**
	 * 在事件循环中执行连接
	 * @param remotePeer
	 * @param delay
	 */
	private void doConnect(   final SocketAddress remotePeer, long delay ){

		this.eventLoop.schedule(  new Runnable() {

			@Override
			public void run() {
				connect(remotePeer, true );
			}
		}, delay, TimeUnit.MILLISECONDS );

	}

	/**
	 * 直接连接
	 * @param remotePeer
	 */
	protected void connect(final SocketAddress remotePeer, boolean isReconnect ) {
		try {
            Bootstrap b = new Bootstrap();

            if( linux ) {
                b.group( eventLoop )
                        .channel(EpollSocketChannel.class)
                        .handler(  clientInitialHandler );
            }else{
                b.group( eventLoop )
                        .channel(NioSocketChannel.class)
                        .handler( clientInitialHandler );
            }

            ChannelFuture channelFuture = b.connect( remotePeer );
            channelFuture.addListener(new ChannelFutureListener() {

				@Override
				public void operationComplete(final ChannelFuture channelFuture) throws Exception {
					if (!channelFuture.isSuccess()) {
						logger.info("Can't connect to remote server. serverName=" + cfg.getServerName() + "|remote peer=" + remotePeer.toString());
						if (isReconnect) {
							reconnect(remotePeer);
						}
					} else {
						logger.info("Successfully connect to remote server. objName=" + cfg.getServerName() + "|remote peer=" + remotePeer);
						RPCClientInHandler handler = channelFuture.channel().pipeline().get(RPCClientInHandler.class);
						addHandler(handler);
					}
				}

			});


        } catch (Exception e) {
            logger.error("doConnect got exception|msg=" + e.getMessage(), e);
			if( isReconnect ) {
				reconnect(remotePeer);
			}
        }
	}


	/**
	 * 调用doConnect 进行重连
	 * @param remotePeer
	 */
	private void reconnect(  final SocketAddress remotePeer ){
		doConnect( remotePeer, RECONN_INTERVAL_MILLIS );
	}

	/**
	 * 先关闭原有的Handler, 再发起一个Socket重连
	 */
	void removeHandlerAndReconnect(  RPCClientInHandler handler ){

		//如果移除成功,才进行重连,防止错误的多开重连
		if( removeHandler( handler ) ) {
			reconnect(this.remotePeer);
		}
	}

	/**
	 * 注册handler
	 * @param handler
	 */
	private void addHandler( RPCClientInHandler handler ){
		synchronized ( this.handlers ) {

			this.handlers.add(handler);

			//TODO  测试下，确保这个消息发出去对方能接收
			//一旦连接上就注册ID, 中间很多次断开可能会重连, 每次连接都需要重新注册ID
			handler.sendRegisterClientId( );
		}
	}

	private boolean removeHandler( RPCClientInHandler handler ){
		synchronized ( this.handlers ) {
			return this.handlers.remove(handler);
		}
	}

	@Override
	public Object call(  String service, String funName, Object...args ){

		RPCClientInHandler handler = getChannelHandler( );

		if( handler != null ){

			Object[] parameters = null;
			boolean isAsync = true;
			RPCCallback callback = null;

			if( args != null && args.length > 0 && args[ args.length -1 ] instanceof RPCCallback ){
				//异步调用
				parameters = new Object[ args.length -1 ];
				for( int i = 0; i < args.length -1; i++ ){
					parameters[ i ] = args[i];
				}

				isAsync = true;
				callback = (RPCCallback)args[ args.length -1 ];
			}else{
				//同步调用
				parameters = args;
				isAsync = false;
				callback = null;
			}

			RPCRequest rpcRequest = new RPCRequest();
			rpcRequest.setObjectName(service);
			rpcRequest.setMethodName(funName);
			rpcRequest.setParameters(parameters);


			if( isAsync ){
				rpcRequest.setRpcType(ProtocolConstants.RPCConstants.RPCType.async);
			}else{
				rpcRequest.setRpcType(ProtocolConstants.RPCConstants.RPCType.normal);
			}

			RPCFuture future = handler.sendRPC(rpcRequest);

			if( isAsync ){
				//异步的, 设置回调,
				future.addCallback(callback);
				return true;
			}else{
				try {
					return future.get( DEFAULT_SYNC_TIME_OUT , TimeUnit.SECONDS );
				} catch ( Throwable  e) {
					String errorMsg =  "远程调用出错, service="+service+", name=" + funName + ", timout=" +  DEFAULT_SYNC_TIME_OUT + "秒";
					logger.error( errorMsg );
					throw new RuntimeException( errorMsg , e );
				}
			}


		}else{
			logger.debug(  "server not found error" );
			throw new RuntimeException( "server not found error" );
		}
	}




	private RPCClientInHandler getChannelHandler(  ) {

		synchronized ( this.handlers ) {
			if (handlers.size() > 0) {
				//TODO 这里还有线程并发问题, 可能get时索引越界
				return handlers.get(handlerIndex.getAndIncrement() % handlers.size());
			} else {
				return null;
			}
		}

	}


	@Override
	public void setPushHandler( String clientId, IPushHandler pushHandler ){
		if( clientId == null || pushHandler == null ){
			 throw new IllegalArgumentException( "参数String clientId, IPushHandler pushHandler 都不能为空" );
		}

		if( this.clientId != null ){
			logger.error( "无效的重复设置clientId:{}", clientId );
			return;
		}

		this.clientId = clientId;
		this.pushHandler = pushHandler;

		//调用所有已经连接上的handler注册ID, 因为它们不会再调用addHandler( RPCClientInHandler handler )
		synchronized ( this.handlers ) {
			for( int i = 0; i < handlers.size(); i++ ){
				handlers.get(i).sendRegisterClientId();
			}
		}
	}


	public String getClientId() {
		return clientId;
	}

	public IPushHandler getPushHandler(){
		return pushHandler;
	}

}
